package test.long_connection;

import io.netty.channel.Channel;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.concurrent.ScheduledFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

public class LiveHandler extends SimpleChannelInboundHandler<LiveMessage> { // 1

    private static Map<Integer, LiveChannelCache> channelCache = new HashMap<>();
    private Logger                                logger       = LoggerFactory.getLogger( LiveHandler.class );

    @Override
    protected void channelRead0( ChannelHandlerContext ctx, LiveMessage msg ) throws Exception {
        Channel channel = ctx.channel();
        final int hashCode = channel.hashCode();
        logger.debug( "channel hashCode:" + hashCode + " msg:" + msg + " cache:" + channelCache.size() );

        if ( !channelCache.containsKey( hashCode ) ) {
            logger.debug( "channelCache.containsKey(hashCode), put key:" + hashCode );
            channel.closeFuture()
                    .addListener( future -> {
                        logger.debug( "channel close, remove key:" + hashCode );
                        channelCache.remove( hashCode );
                    } );
            ScheduledFuture<?> scheduledFuture = ctx.executor()
                    .schedule( () -> {
                        logger.debug( "schedule runs, close channel:" + hashCode );
                        channel.close();
                    }, 10, TimeUnit.SECONDS );
            channelCache.put( hashCode, new LiveChannelCache( channel, scheduledFuture ) );
        }

        switch ( msg.getType() ) {
            case LiveMessage.TYPE_HEART:
                LiveChannelCache cache = channelCache.get( hashCode );
                ScheduledFuture<?> scheduledFuture = ctx.executor()
                        .schedule( () -> channel.close(), 5, TimeUnit.SECONDS );
                cache.getScheduledFuture()
                        .cancel( true ); // 将原来的取消
                cache.setScheduledFuture( scheduledFuture ); // 添加新的
                ctx.channel()
                        .writeAndFlush( msg );
                break;
            case LiveMessage.TYPE_MESSAGE:
                channelCache.entrySet()
                        .stream()
                        .forEach( entry -> {
                            Channel otherChannel = entry.getValue()
                                    .getChannel();
                            otherChannel.writeAndFlush( msg );
                        } );
                break;
        }
    }

    @Override
    public void channelReadComplete( ChannelHandlerContext ctx ) throws Exception {
        logger.debug( "channelReadComplete" );
        super.channelReadComplete( ctx );
    }

    @Override
    public void exceptionCaught( ChannelHandlerContext ctx, Throwable cause ) throws Exception {
        logger.debug( "exceptionCaught" );
        if ( null != cause )
            cause.printStackTrace();
        if ( null != ctx )
            ctx.close();
    }
}
